#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"

#include "rmvol_bh.h"
#include "volume_bh.h"
#include "volume_ctl.h"

/**
 * @file 快照树管理
 *
 * 快照树的构成（目前最多256个）：
 * - root snap是虚拟快照
 * - 中间节点是用户快照
 * - 所有叶子节点，要么是auto snap，要么是卷
 *
 */

/**
 * snapshot cow flow by Gabe:
 *
 *  /iscsi/vol1
 *  `-- lich_system_root ver:0 from:-1 allocate:0
 *      `-- snap1 ver:1 from:0 allocate:0
 *          |-- snap2 ver:2 from:1 allocate:0
 *          |   `-- lich_system_curr ver:6 from:2 allocate:0
 *          `-- snap3 ver:4 from:1 allocate:0
 *              `-- lich_system_auto.caa1c27f-8959-4500-b862-810791270c42 ver:5 from:4 allocate:0
 *
 * manual: http://192.168.1.7:8090/pages/viewpage.action?pageId=6554189
 * step:
 * 1. is need create auto snap: if this node has not brother node, so need create auto snapshot to backup data.
 * 2. backup: build a snap list(from public parent node to current node), if modify in list, but not modify in last snap in list, so need backup to auto snap.
 * 3. rollback: read data in target snap and child snap first. if not found, find in build list.
 * 4. cleanup auto snapshot: if there have a autosnap in target snapshot childs, need remove this auto snapshot.
 *
 * time line
 *          |   +-----------+
 *          |   | a | b | c |    vol
 *          |   +-----------+
 *          |                                        time line
 *          |   +-----------+                                 |   +-----------+
 *   snap1--|   | a |   |   |    snap1    .-------------->    |   | a | b | c |    vol
 *          |   +-----------+             |                   |   +-----------+
 *          |     ^                       |                   |
 *          |     |                       |                   |
 *          |   +-----------+             |                   |   +-----------+
 *   a->a1--|   | a1| b | c |    vol      |            snap3--|   |   |   | c |    snap3
 *          |   +-----------+             |                   |   +-----------+
 *          |         ^                   |                   |             ^
 *          |         |                   |                   |             |                          time line
 *          |   +-----------+             |                   |   +-----------+                                 |   +-----------+
 *   snap2--|   |   | b |   |    snap2    |            c->c1--|   | a | b | c1|    vol      .-------------->    |   | a1| b | c |    vol
 *          |   +-----------+             |                   |   +-----------+             |                   |   +-----------+
 *          |                             |                   |                             |                   |
 *          |   +-----------+             |                   |                             |                   |
 *   b->b1--|   | a1| b1| c |    vol      |                   |   +-----------+             |                   V
 *          |   +-----------+             |          rollback-|   |   |   |   |  auto(rollback to snap2): build list(snap1 --> snap3 --> curr)
 *          |     |                       |                   |   +-----------+  a. backup:   not modify, do not nedd backup
 *          |     V                       |                   |                     rollback: chunk data not in snap2, buf find in auto snap, so rollback to auto data(a1)
 *          |   +-----------+             |                   V                  b. backup:   not modify, do not nedd backup
 * rollback-|   | a1|   |   |  auto(rollback to snap1):                             rollback: chunk data in snap2, so rollback to snap2 data(b)
 *          |   +-----------+                                                    c. backup:   modify in last snap, so do not need backup
 *          |                                                                       rollback: chunk data not in snap2, and not in auto snap too, so find in list, rollback to snap3 data(c)
 *          |    build list(snap1 --> snap2 --> curr)
 *          | a. backup:   modify between snap1 and snap2,so need save a1 to auto snapshot.
 *          |    rollback: chunk data in snap1, so rollback snap1 data(a) direct
 *          | b. backup:   modify after snap2, do not need save, just drop it.
 *          v    rollback: chunk data not in snap1, so read from child snap, found in snap2, rollback to snap2 data(b)
 *            c. backup:   not modify, do not need save. data not change
 *               rollback: chunk data not in snap1, so read from child snap, not in snap2 also, so, data in self, not change
 *
 */

typedef struct {
        volume_proto_t *volume_proto;
        int need;
        int force;
} arg_t;

STATIC int __volume_proto_snapshot_cleanup_direct(volume_proto_t *volume_proto, const char *name);
STATIC int __volume_proto_snapshot_chunk_exist(const nid_t *snapnid,
                                               const fileid_t *snapid, int idx, int *exist);

static inline void __snapshot_list_free(struct list_head *list)
{
        struct list_head *pos, *n;

        list_for_each_safe(pos, n, list) {
                list_del(pos);
                yfree((void **)&pos);
        }
}

#define SNAPSHOT_EXISTS_WITH_PEEK TRUE
static int __is_chunk_in_snapshot(volume_proto_t *volume_proto,
                                  const nid_t *snapnid, const chkid_t *snapid,
                                  const chkid_t *chkid, int *exist) {
        int ret;

        volume_proto->rollback_ctx.stat.snap_exist++;

#if SNAPSHOT_EXISTS_WITH_PEEK
        uint64_t offset, left;
        uint32_t size;
        io_t io;
        buffer_t buf;

        *exist = 1;

        offset = (LLU)chkid->idx * LICH_CHUNK_SPLIT;
        left = volume_proto->table1.fileinfo.size - offset;
        size = _min(left, LICH_CHUNK_SPLIT);

        io_init(&io, snapid, NULL, offset, size, 0);
        mbuffer_init(&buf, 0);
        ret = volume_proto_snapshot_read_remote(snapnid, &io, &buf, TRUE);
        if (unlikely(ret)) {
                // 一分为三：确定的结果(0, 1)，error
                if (ret == ENOENT || ret == ENOKEY) {
                        *exist = 0;
                } else {
                        GOTO(err_ret, ret);
                }
        }

        mbuffer_free(&buf);
#else
        ret = __volume_proto_snapshot_chunk_exist(snapnid, snapid, chkid->idx, exist);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                } else {
                        GOTO(err_ret, ret);
                }
        }
#endif

        DINFO("snap %s chunk raw.%ju.%u exist %d\n", id2str(snapid), chkid->id, chkid->idx, *exist);
        return 0;
err_ret:
#if SNAPSHOT_EXISTS_WITH_PEEK
        mbuffer_free(&buf);
#endif
        return ret;
}

static int __volume_proto_snapshot_listopen(volume_proto_t *volume_proto, const char *uuid)
{
        (void) volume_proto;
        (void) uuid;
        return 0;
}

static int __volume_proto_snapshot_listclose(volume_proto_t *volume_proto, const char *uuid)
{
        (void) volume_proto;
        (void) uuid;
        return 0;
}

STATIC int __volume_proto_snapshot_list(volume_proto_t *volume_proto, const char *uuid,
                                        uint64_t offset, void *de, int *delen)
{
        table1_t *table1;

        (void) uuid;
        (void) offset;

        table1 = &volume_proto->table1;
        return table1->snapshot_list(table1, de, delen);
}

static int __volume_proto_snapshot_create(volume_proto_t *volume_proto,
                                          const char *name, int p, const char *_site, int force)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;
        ret = table1->snapshot_create(table1, name, p, _site, force);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_rollback(volume_proto_t *volume_proto, const char *name)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_rollback(table1, name);
}

static int __volume_proto_snapshot_lookup(volume_proto_t *volume_proto, const char *name, chkinfo_t *chkinfo)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_lookup(table1, name, chkinfo, NULL);
}

static int __volume_proto_snapshot_check(volume_proto_t *volume_proto, const char *name)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_check(table1, name);
}

STATIC int __volume_proto_snapshot_remove__(volume_proto_t *volume_proto, const char *name)
{
        int ret;
        table1_t *table1;
        char newname[MAX_NAME_LEN], tmp[MAX_NAME_LEN];
        uuid_t uuid;

        table1 = &volume_proto->table1;
        uuid_generate(uuid);
        uuid_unparse(uuid, tmp);
        snprintf(newname, MAX_NAME_LEN, "%s.%s.%s", LICH_SYSTEM_ATTR_UNLINK, name, tmp);

        DINFO("vol %s name %s newname %s\n", id2str(&volume_proto->chkid), name, newname);

        // 标记删除，并启动后台任务
        // 新名字包含卷+快照名字两部分，以方便bh task定位。
        ret = table1->snapshot_rename(table1, name, newname);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // volume_bh_snapcleanup(&table1->fileinfo.id, newname);
        ret = rmsnap_bh_create(volume_proto->table1.pool, &table1->fileinfo.id, newname);
        if (unlikely(ret)) {
                DWARN("rmsnap_bh_create "CHKID_FORMAT" %s failed ret:%d\n",
                      CHKID_ARG(&table1->fileinfo.id), newname, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_remove(volume_proto_t *volume_proto,
                                          const char *name, int force)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;
        if (force) {
                ret = table1->snapshot_remove(table1, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __volume_proto_snapshot_remove__(volume_proto, name);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_protect(volume_proto_t *volume_proto, const snap_protect_param_t on)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_protect(table1, on.on);
}

STATIC int __volume_proto_snapshot_updateparent(volume_proto_t *volume_proto,
                const char *name, const uint64_t from)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_updateparent(table1, name, from);
}

STATIC int __volume_proto_snapshot_setfrom(volume_proto_t *volume_proto, const uint64_t from)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_setfrom(table1, from);
}

STATIC int __volume_proto_snapshot_last(volume_proto_t *volume_proto,
                nid_t *nid, fileid_t *fileid, char *name, uint64_t *snap_version)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_last(table1, nid, fileid, name, snap_version);
}

STATIC int __volume_proto_snapshot_prev(volume_proto_t *volume_proto,
                const chkid_t *chkid, chkid_t *prev, char *name, uint64_t *snap_version)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_prev(table1, chkid, prev, name, snap_version);
}

#if 0
STATIC int __volume_proto_snapshot_cleanup(volume_proto_t *volume_proto)
{
        int ret, chknum, i;
        table1_t *table1;
        table2_t *table2;
        chkid_t chkid;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        table2 = &volume_proto->table2;
        chknum = size2chknum(table1->fileinfo.size, &table1->fileinfo.ec);
        chkinfo = (void *)_chkinfo;

        for (i = 0; i < chknum; i++) {
                fid2cid(&chkid, &table1->fileinfo.id, i);
                ret = table2->chunk_getinfo(table2, &chkid, chkinfo, NULL);
                if (ret != ENOENT) {
                        DWARN("chunk "CHKID_FORMAT" exist\n",
                              CHKID_ARG(&chkid));
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }
        }

        ret = volume_proto->cleanup(volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}
#endif

static int __volume_proto_snapshot_newchunk(const nid_t *snapnid,
                                            const fileid_t *snapid, int idx, const buffer_t *buf)
{
        int ret;
        chkid_t newchunk;

        ANALYSIS_BEGIN(0);

        fid2cid(&newchunk, snapid, idx);
        if (net_islocal(snapnid)) {
                ret = volume_ctl_newchunk(snapid, &newchunk, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_newchunk(snapnid, snapid, &newchunk, buf);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_cow__(volume_proto_t *volume_proto,
                                       const chkid_t *chkid, const nid_t *snapnid,
                                       const fileid_t *snapid)
{
        int ret, size;
        uint64_t offset, left, split_size;
        buffer_t buf;
        io_t io;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        table2_t *table2;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        ec_t *ec;
        vclock_t vclock;
        vfm_t *vfm;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];

        //DINFO("cow "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;

        table2 = &volume_proto->table2;
        vfm = (void *)_vfm;
        ret = table2->pre_io(table2, chkid, chkinfo, chkstat, vfm, &vclock.clock, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vclock.vfm = vfm->clock;
        
        ec = &volume_proto->table1.fileinfo.ec;
#if ENABLE_EC
        if (EC_ISEC(ec)) {
                split_size = ec->k * LICH_CHUNK_SPLIT;
#if ECLOG_ENABLE
                if (!eclog_chunk_islog(&chkinfo->id, ec)) {
                        offset = (LLU)eclog_chunk_skiplog(&chkinfo->id, ec) * split_size;
                } else {
                        goto out;
                }
#else
                offset = (LLU)chkinfo->id.idx * split_size;
#endif
        } else {
                split_size = LICH_CHUNK_SPLIT;
                offset = (LLU)chkinfo->id.idx * split_size;
        }
#else
        (void) ec;
        split_size = LICH_CHUNK_SPLIT;
        offset = (LLU)chkinfo->id.idx * split_size;
#endif
        

        if (volume_proto->table1.fileinfo.size > offset) {
                left = volume_proto->table1.fileinfo.size - offset;
                size = left > split_size ? split_size : left;

                YASSERT(size > 0);

                mbuffer_init(&buf, 0);
                io_init(&io, &chkinfo->id, &vclock, 0, size, volume_proto->table1.fileinfo.attr & __FILE_ATTR_WRITEBACK__);
                ret = volume_proto->chunk_ops.read(chkinfo, chkstat, vfm, &io,
                                                   &buf, &volume_proto->table1.fileinfo.ec);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("cow read vol %s\n", id2str(&chkinfo->id));

                ret = __volume_proto_snapshot_newchunk(snapnid, snapid, chkid->idx, &buf);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                DWARN("cow "CHKID_FORMAT"[%u] @ %s exist\n",
                                      CHKID_ARG(snapid), chkid->idx, network_rname(snapnid));
                        } else
                                GOTO(err_free, ret);
                }

                mbuffer_free(&buf);

                DBUG("cow write vol %s to snap "CHKID_FORMAT"\n", id2str(&chkinfo->id), CHKID_ARG(snapid));
        }
#if ECLOG_ENABLE
out:
#endif
        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

typedef struct {
        /*
        nid_t nid;
        */
        fileid_t fileid;
        size_t size;
        off_t offset;

        nid_t snapnid;
        fileid_t snapid;
        int peek;
        uint64_t max;

        buffer_t *buf;
        int retval;
        char pool[MAX_NAME_LEN];
} core_ctx_t;

static void __volume_proto_snapshot_read_remote__(void *_core_ctx)
{
        int ret;
        core_ctx_t *core_ctx = _core_ctx;
        io_t io;

        io_init(&io, &core_ctx->snapid, NULL, core_ctx->offset, core_ctx->size, core_ctx->peek ? __FILE_ATTR_PEEK__ : 0);

        if (net_islocal(&core_ctx->snapnid)) {
                ret = volume_ctl_read(&io, core_ctx->buf, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_read(&core_ctx->snapnid, &io, core_ctx->buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        core_ctx->retval = 0;
        return;
err_ret:
        core_ctx->retval = ret;
        return;
}

/**
 * @brief 读取某一快照
 *
 * @param snapnid
 * @param io
 * @param buf
 * @param peek
 * @return
 */
int volume_proto_snapshot_read_remote(const nid_t *snapnid, const io_t *io, buffer_t *buf, int peek)
{
        int ret;
        core_ctx_t core_ctx;
        char pool[MAX_NAME_LEN];
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo;

        chkinfo = (void *)_chkinfo;

        ANALYSIS_BEGIN(0);

        core_ctx.snapnid = *snapnid;
        core_ctx.snapid = io->id;
        core_ctx.size = io->size;
        core_ctx.offset = io->offset;
        core_ctx.buf = buf;
        core_ctx.peek = peek;
        core_ctx.retval = 0;

        YASSERT(io->size);
        //YASSERT(core_self() && core_self()->hash == core_hash(snapid));

        ret = core_request0(core_hash(&io->id), __volume_proto_snapshot_read_remote__,
                            &core_ctx, "snapshot_read");
        if (unlikely(ret)) {
                if (ret == ESTALE) {
                        ret = EAGAIN;
                        GOTO(err_mig, ret);
                } else
                        GOTO(err_mig, ret);
        }

        ret = core_ctx.retval;
        if (unlikely(ret))
                GOTO(err_mig, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_mig:
	if (ret == ENONET) {
                /* for snap master replic migration */
		ret = md_getpool(&io->id, pool);
		if (unlikely(ret))
			GOTO(err_ret, ret);

		ret = md_chunk_getinfo(pool, NULL, &io->id, chkinfo, NULL);
		if (unlikely(ret))
			GOTO(err_ret, ret);

		ret = ENONET;
	}

err_ret:
        ANALYSIS_END(0, IO_WARN, NULL);
        return ret;
}

/**
 * @brief 读取快照数据，写入卷
 *
 * @param volume_proto
 * @param snapnid
 * @param snapid
 * @param chkid
 * @return
 */
static int __volume_proto_snapshot_rollback__(volume_proto_t *volume_proto,
                                              const nid_t *snapnid, const fileid_t *snapid,
                                              const chkid_t *chkid, int *update)
{
        int ret;
        buffer_t buf;
        io_t io;
        vclock_t vclock;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        table2_t *table2;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        uint64_t offset, split_size, left;
        uint32_t size;
        ec_t *ec;

        DINFO("rollback "CHKID_FORMAT" to snap vol.%ju.%u\n", CHKID_ARG(chkid), snapid->id, snapid->idx);

        ec = &volume_proto->table1.fileinfo.ec;
#if ENABLE_EC
        if (EC_ISEC(ec)) {
                split_size = ec->k * LICH_CHUNK_SPLIT;
#if ECLOG_ENABLE
                if (!eclog_chunk_islog(chkid, ec)) {
                        offset = (LLU)eclog_chunk_skiplog(chkid, ec) * split_size;
                } else {
                        goto out;
                }
#else
                offset = (LLU)chkid->idx * split_size;
#endif
        } else {
                split_size = LICH_CHUNK_SPLIT;
                offset = (LLU)chkid->idx * split_size;
        }
#else
        (void) ec;
        split_size = LICH_CHUNK_SPLIT;
        offset = (LLU)chkid->idx * split_size;
#endif

        left = volume_proto->table1.fileinfo.size - offset;
        size = left > split_size ? split_size : left;

        YASSERT(size > 0);

        io_init(&io, snapid, 0, offset, size, 0);

        volume_proto->rollback_ctx.stat.snap_read++;

        io_init(&io, snapid, NULL, offset, size, 0);
        mbuffer_init(&buf, 0);
        ret = volume_proto_snapshot_read_remote(snapnid, &io, &buf, FALSE);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("snap rollback read snap %s for vol raw.%ju.%u offset %ju size %u\n",
              id2str(snapid), chkid->id, chkid->idx, offset, size);

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;

        vfm_t *vfm;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        vfm = (void *)_vfm;
        table2 = &volume_proto->table2;

        int localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        ret = table2->chunk_check(table2, chkid, __OP_WRITE, localize, NULL, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = table2->pre_io(table2, chkid, chkinfo, chkstat, vfm, &vclock.clock, __OP_WRITE);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT && ret != ENOKEY);
                GOTO(err_free, ret);
        }

        vclock.vfm = vfm->clock;
        
        volume_proto->rollback_ctx.stat.vol_write++;

        io_init(&io, &chkinfo->id, &vclock, 0, buf.len, volume_proto->table1.fileinfo.attr);
        ret = volume_proto->chunk_ops.write(chkinfo, chkstat, vfm, &io, &buf,
                                            &volume_proto->table1.fileinfo.ec);
        if (unlikely(ret)) {
                DWARN("rollback "CHKID_FORMAT" fail\n", CHKID_ARG(chkid));
                YASSERT(ret != ENOENT && ret != ENOKEY);
                table2->reset(table2, chkid);
                GOTO(err_free, ret);
        }

        ret = table2->post_io(table2, chkid, chkinfo, chkstat, vclock.clock);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT && ret != ENOKEY);
                GOTO(err_free, ret);
        }

        *update = TRUE;

        mbuffer_free(&buf);
#if ECLOG_ENABLE
out:
#endif
        DINFO("snap rollback write snap %s to vol raw.%ju.%u offset %ju size %u\n",
              id2str(snapid), chkid->id, chkid->idx, offset, size);
        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

static int __volume_proto_snapshot_cow(volume_proto_t *volume_proto,
                                     const chkid_t *chkid)
{
        int ret;
        table1_t *table1;
        table2_t *table2;
        fileinfo_t *fileinfo;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;

        if (fileinfo->snap_from == (uint64_t)-1) {
                DINFO("file "CHKID_FORMAT" snap removed\n",
                      CHKID_ARG(chkid));

                goto out;
        }

        chkinfo = (void *)_chkinfo;
        ret = table1->snapshot_getbyversion(table1, fileinfo->snap_from, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_proto_snapshot_cow__(volume_proto, chkid, &chkinfo->diskid[0].id, &chkinfo->id);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        DBUG("update "CHKID_FORMAT", snap_version %ju\n", CHKID_ARG(chkid), fileinfo->snap_version);
        table2 = &volume_proto->table2;
        ret = table2->chunk_snapshot_update(table2, chkid, fileinfo->snap_version, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief 读取卷上数据，写入auto snap
 *
 */
STATIC int __volume_proto_snapshot_auto_backup__(volume_proto_t *volume_proto,
                                                 const nid_t *autonid, const fileid_t *autoid,
                                                 const chkid_t *chkid)
{
        int ret, size;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        table2_t *table2;
        buffer_t buf;
        io_t io;
        uint64_t offset, left;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        vclock_t vclock;

        DINFO("chkid "CHKID_FORMAT" need backup to autosnap "CHKID_FORMAT"\n",
              CHKID_ARG(chkid), CHKID_ARG(autoid));

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;

        vfm_t *vfm;
        char _vfm[VFM_SIZE(VFM_COUNT_MAX)];
        vfm = (void *)_vfm;
        table2 = &volume_proto->table2;
        ret = table2->pre_io(table2, chkid, chkinfo, chkstat, vfm, &vclock.clock, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vclock.vfm = vfm->clock;
        
        offset = (LLU)chkinfo->id.idx * LICH_CHUNK_SPLIT;
        left = volume_proto->table1.fileinfo.size - offset;
        size = left > LICH_CHUNK_SPLIT ? LICH_CHUNK_SPLIT : left;
        io_init(&io, &chkinfo->id, &vclock, 0, size, 0);

        volume_proto->rollback_ctx.stat.vol_read++;

        mbuffer_init(&buf, 0);
        ret = volume_proto->chunk_ops.read(chkinfo, chkstat, vfm, &io, &buf,
                                           &volume_proto->table1.fileinfo.ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("snap rollback read vol %s\n", id2str(&chkinfo->id));

        volume_proto->rollback_ctx.stat.auto_write++;

        ret = __volume_proto_snapshot_newchunk(autonid, autoid, chkid->idx, &buf);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DWARN("autosnap "CHKID_FORMAT"[%u] @ %s exist\n",
                              CHKID_ARG(autoid), chkid->idx, network_rname(autonid));
                } else
                        GOTO(err_free, ret);
        }

        mbuffer_free(&buf);

        DINFO("snap rollback write vol %s snap to raw.%ju.%u\n",
              id2str(&chkinfo->id), autoid->id, autoid->idx);
        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

#if LICH_SNAPTREE_NEWALGO
STATIC int __volume_proto_snapshot_auto_backup_newalgo(volume_proto_t *volume_proto,
                const fileid_t *fromid, const nid_t *autonid, const fileid_t *autoid,
                const chkid_t *chkid, struct list_head *list)
{
        int ret, exist;
        struct list_head *pos;
        snap_t *snap;

        // 由下往上遍历
        list_for_each_prev(pos, list) {
                snap = (void *)pos;

                if (!chkid_cmp(&snap->chkinfo->id, fromid))
                        continue;

                ret = __is_chunk_in_snapshot(volume_proto, &snap->chkinfo->diskid[0].id, &snap->chkinfo->id,
                                             chkid, &exist);
                if (unlikely(ret)) {
                        YASSERT(ret != ENOENT && ret != ENOKEY);
                        GOTO(err_ret, ret);
                }

                if (!exist) {
                        continue;
                }

                ret = __volume_proto_snapshot_auto_backup__(volume_proto,
                                autonid, autoid, chkid);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                break;
        }

        return 0;
err_ret:
        return ret;
}
#endif

STATIC int __volume_proto_snapshot_auto_backup(volume_proto_t *volume_proto,
                                              const nid_t *autonid, const fileid_t *autoid,
                                              const chkid_t *chkid, struct list_head *list)
{
        int ret, exist;
        fileinfo_t *fileinfo;
        table1_t *table1;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;
        chkinfo = (void *)_chkinfo;

        /**
         * which chunk needs backup to auto snapshot:
         * chunk has be modified in the list, and not exists in the last snapshot(filinfo->snap_from).
         */
        ret = table1->snapshot_getbyversion(table1, fileinfo->snap_from, chkinfo, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        nid_t *fromnid = &chkinfo->diskid[0].id;
        fileid_t *fromid = &chkinfo->id;

        ret = __is_chunk_in_snapshot(volume_proto, fromnid, fromid, chkid, &exist);
        if (unlikely(ret)) {
                YASSERT(ret != ENOENT && ret != ENOKEY);
                GOTO(err_ret, ret);
        }

        if (!exist) {
#if LICH_SNAPTREE_NEWALGO
                ret = __volume_proto_snapshot_auto_backup_newalgo(volume_proto, fromid, autonid, autoid,
                                                                  chkid, list);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
#else
                (void) list;
                ret = __volume_proto_snapshot_auto_backup__(volume_proto,
                                        autonid, autoid, chkid);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }
#endif
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_updatelist(struct list_head *list, const chkinfo_t *chkinfo)
{
        struct list_head *pos;
        snap_t *snap;

        list_for_each(pos, list) {
                snap = (void *)pos;

                if (!chkid_cmp(&snap->chkinfo->id, &chkinfo->id)) {
                        CHKINFO_CP(snap->chkinfo, chkinfo);
                        break;
                }
        }

        return 0;
}

STATIC int volume_proto_snapshot_buildlist(volume_proto_t *volume_proto, uint64_t src,
                uint64_t dst, int bottomup, struct list_head *list)
{
        int ret, found = 0;
        table1_t *table1;
        struct list_head from_list, rollback_list;
        struct list_head *from_pos, *from_p, *rollback_pos;
        snap_t *from_snap, *rollback_snap;

        table1 = &volume_proto->table1;
        INIT_LIST_HEAD(&from_list);
        INIT_LIST_HEAD(&rollback_list);

        ret = table1->snapshot_listparent(table1, src, &from_list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table1->snapshot_listparent(table1, dst, &rollback_list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        /**
         * root
         * `----snap1
         *      |-----snap2
         *      |        `snap3  (rollback)
         *      `----snap4
         *           |----snap5
         *           `----snap6
         *                `----vol_curr
         *
         * from_list: snap6 --> snap4 --> snap1 --> root
         * rollback_list: snap3 --> snap2 --> snap1 --> root
         * return list: snap1 --> snap4 --> snap6 (reverse order of from_list from common parent to end)
         */
        list_for_each_prev_safe(from_pos, from_p, &from_list) {
                from_snap = (void *)from_pos;

                list_for_each(rollback_pos, &rollback_list) {
                        rollback_snap = (void *)rollback_pos;
                        if (from_snap->snap_version == rollback_snap->snap_version) {
                                found = 1;
                                break;
                        }
                }

                list_del(from_pos);
                if (bottomup) {
                        list_add_tail(&from_snap->hook, list);
                } else {
                        list_add(&from_snap->hook, list);
                }

                if (found)
                        break;
        }

        __snapshot_list_free(&from_list);
        __snapshot_list_free(&rollback_list);
        return 0;
err_ret:
        __snapshot_list_free(&from_list);
        __snapshot_list_free(&rollback_list);
        return ret;
}

#if LICH_SNAPTREE_NEWALGO
STATIC int __volume_proto_snapshot_rollback_newalgo(volume_proto_t *volume_proto,
                const chkid_t *chkid, struct list_head *list, int *update)
{
        int ret;
        struct list_head *pos;
        snap_t *snap;

        list_for_each(pos, list) {
                snap = (void *)pos;

                ret = __volume_proto_snapshot_rollback__(volume_proto,
                                &snap->chkinfo->diskid[0].id, &snap->chkinfo->id, chkid, update);
                if (unlikely(ret)) {
                        if (ret == ENOENT || ret == ENOKEY) {
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                break;
        }

        return 0;
err_ret:
        return ret;
}
#endif

STATIC int __volume_proto_snapshot_chunk_backup(volume_proto_t *volume_proto, const chkid_t *chkid,
                chkinfo_t *chkinfo, struct list_head *list)
{
        int ret, retry = 0;
        table1_t *table1;
        fileid_t snapid;

        table1 = &volume_proto->table1;
        snapid = chkinfo->id;
retry:
        ret = __volume_proto_snapshot_auto_backup(volume_proto,
                        &chkinfo->diskid[0].id, &snapid, chkid, list);
        if (unlikely(ret)) {
                if (ret == ENONET) {
                        if (retry != 0)
                                GOTO(err_ret, ret);

                        CHKINFO_DUMP(chkinfo, D_INFO);
                        /**
                         * chkinfo->diskid[0].id maybe not online, rollback always fail.
                         * so, we need migrate and get chunk info again(migrate in md_chunk_getinfo).
                         */
                        ret = md_chunk_check(net_getnid(), &volume_proto->chkid, chkid, -1, 0, 0, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret = md_chunk_getinfo(table1->pool, &volume_proto->chkid, &snapid, chkinfo, NULL);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        ret =  __volume_proto_snapshot_updatelist(list, chkinfo);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        retry = 1;
                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_chunk_rollback__(volume_proto_t *volume_proto, const chkid_t *chkid,
                chkinfo_t *chkinfo, struct list_head *list, uint64_t _snap_version, int *update)
{
        int ret, retry = 0;
        uint64_t snap_version = _snap_version;
        fileinfo_t *fileinfo;
        fileid_t snapid;
        table1_t *table1;

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;

        while (1) {
                if (snap_version == fileinfo->snap_version)
                        break;

                ret = table1->snapshot_getbyversion(table1, snap_version, chkinfo, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                retry = 0;
                snapid = chkinfo->id;
retry:
                ret = __volume_proto_snapshot_rollback__(volume_proto,
                                &chkinfo->diskid[0].id, &snapid, chkid, update);
                if (unlikely(ret)) {
                        if (ret == ENONET) {
                                if (retry != 0)
                                        GOTO(err_ret, ret);

                                CHKINFO_DUMP(chkinfo, D_INFO);
                                /**
                                 * chkinfo->diskid[0].id maybe not online, rollback always fail.
                                 * so, we need migrate and get chunk info again(migrate in md_chunk_getinfo).
                                 */

                                ret = md_chunk_check(net_getnid(), &volume_proto->chkid, chkid, -1, 0, 0, NULL);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                ret = md_chunk_getinfo(table1->pool, &volume_proto->chkid, &snapid, chkinfo, NULL);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                ret =  __volume_proto_snapshot_updatelist(list, chkinfo);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                retry = 1;
                                goto retry;
                        } else if (ret == ENOENT || ret == ENOKEY) {
                                ret = table1->snapshot_next(table1, &snapid, &snapid, NULL, &snap_version);
                                if (unlikely(ret)) {
                                        if (ret == ENOENT) {
#if LICH_SNAPTREE_NEWALGO
                                                ret = __volume_proto_snapshot_rollback_newalgo( volume_proto,
                                                                chkid, list, update);
                                                if (unlikely(ret))
                                                        GOTO(err_ret, ret);

                                                break;
#else
                                                /* should not come here */
                                                YASSERT(0);
#endif
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                break;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_chunk_rollback(volume_proto_t *volume_proto, const chkid_t *chkid)
{
        int ret, update = FALSE;
        fileinfo_t *fileinfo;
        table1_t *table1;
        table2_t *table2;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        uint64_t snap_version;
        struct list_head list;

        struct timeval t1, t2, t3, t4;

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;

        chkinfo = (void *)_chkinfo;
        snap_version = fileinfo->snap_rollback;

        _gettimeofday(&t1, NULL);

        INIT_LIST_HEAD(&list);
        ret = volume_proto_snapshot_buildlist(volume_proto, fileinfo->snap_from,
                        fileinfo->snap_rollback, 0, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // stage 1, 保存卷上数据到auto snap
        // 此时，fileinfo->snap_version代表生成的auto snap
        ret = table1->snapshot_getbyversion(table1, fileinfo->snap_version, chkinfo, NULL);
        if (!ret) {
                /* there has auto snap, check data is need backup */
                ret = __volume_proto_snapshot_chunk_backup(volume_proto, chkid, chkinfo, &list);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = table1->snapshot_getbyversion(table1, fileinfo->snap_from, chkinfo, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                /* if have not auto snap, then check data is need backup to snap_from */
                ret = __volume_proto_snapshot_chunk_backup(volume_proto, chkid, chkinfo, &list);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        _gettimeofday(&t2, NULL);

        // stage 2, 从快照上恢复数据到卷
        ret = __volume_proto_snapshot_chunk_rollback__(volume_proto, chkid, chkinfo, &list, snap_version, &update);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        _gettimeofday(&t3, NULL);

        // TODO update not ready
        /* must update every chunk */
        if (1 || update) {
                DINFO("update "CHKID_FORMAT", snap_version %ju\n", CHKID_ARG(chkid), fileinfo->snap_rollback);

                volume_proto->rollback_ctx.stat.update_meta++;

                table2 = &volume_proto->table2;
                ret = table2->chunk_snapshot_update(table2, chkid, fileinfo->snap_rollback, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        _gettimeofday(&t4, NULL);

        int64_t used1 = _time_used(&t1, &t2);
        int64_t used2 = _time_used(&t2, &t3);
        int64_t used3 = _time_used(&t3, &t4);

        DINFO("used %jd %jd %jd\n", used1, used2, used3);

        __snapshot_list_free(&list);
        return 0;
err_ret:
        __snapshot_list_free(&list);
        return ret;
}

int volume_proto_snapshot_check(volume_proto_t *volume_proto, const chkid_t *chkid,
                                uint64_t snap_version, int op)
{
        int ret;
        fileinfo_t *fileinfo = &volume_proto->table1.fileinfo;

        if (likely(fileinfo->snap_version == fileinfo->snap_rollback)) {
                if (unlikely(op == __OP_WRITE && fileinfo->snap_version != snap_version)) {
                        DINFO("cow "CHKID_FORMAT", snap_version %ju %ju\n",
                              CHKID_ARG(chkid), fileinfo->snap_version, snap_version);

                        // fileinfo->snap_version代表卷的当前版本号， 创建快照导致snap_version加1
                        // snap_version是卷上chunk版本号(快照上snap_version无意义？），
                        // 发生COW过程时，设定为filefino->snap_version
                        // 发生ROLLBACK过程时，设定为fileinfo->snap_rollback
                        // 两者不同代表chunk数据有更新，且没有发生COW过程
                        ret = __volume_proto_snapshot_cow(volume_proto, chkid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        //nothing todo
                }
        } else {
                //rollback
#if ENABLE_CHUNK_DEBUG
                DINFO("rollback "CHKID_FORMAT", snap_version %ju %ju %ju\n",
                      CHKID_ARG(chkid), fileinfo->snap_version,
                      fileinfo->snap_rollback, snap_version);
#else
                DBUG("rollback "CHKID_FORMAT", snap_version %ju %ju %ju\n",
                      CHKID_ARG(chkid), fileinfo->snap_version,
                      fileinfo->snap_rollback, snap_version);
#endif

#if LICH_SNAPTREE_NEWALGO
#else
                if (fileinfo->snap_rollback != snap_version) {
                        if (fileinfo->snap_version != snap_version) {
                                DINFO("cow "CHKID_FORMAT", snap_version %ju %ju\n",
                                      CHKID_ARG(chkid), fileinfo->snap_version, snap_version);
                                ret =  __volume_proto_snapshot_cow(volume_proto, chkid);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        }
                }
#endif

                if (fileinfo->snap_rollback != snap_version) {
                        volume_bh_add(&fileinfo->id, BH_SNAP_ROLLBACK, 0);

                        ret =  __volume_proto_snapshot_chunk_rollback(volume_proto, chkid);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_rollback_bh1(volume_proto_t *volume_proto, chkid_t *chkid)
{
        int ret;
        table1_t *table1;
        chunk_io_t chunk_io;

        ANALYSIS_BEGIN(0);

        table1 = &volume_proto->table1;
        
        chunk_io.chkinfo = (void *)chunk_io.__chkinfo__;
        chunk_io.chkstat = (void *)chunk_io.__chkstat__;
        chunk_io.vfm = (void *)chunk_io.__vfm__;
        io_init(&chunk_io.io, chkid, NULL, 0, 0, table1->fileinfo.attr);

        ret = volume_proto_chunk_pre_read(volume_proto, &chunk_io);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_remove_auto(volume_proto_t *volume_proto,
                                          uint64_t snap_from, int force)
{
        int ret, need = 0;
        table1_t *table1;
        struct list_head list;
        struct list_head *pos;
        snap_t *snap;

        table1 = &volume_proto->table1;
        INIT_LIST_HEAD(&list);

        ret = table1->snapshot_listchild(table1, snap_from, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (table1->fileinfo.snap_from == snap_from) {
                // 卷
                need = 1;
        } else {
                // 非叶子节点
                list_for_each(pos, &list) {
                        snap = (void *)pos;
                        if (!is_auto_snap(snap->key)) {
                                need = 1;
                        }
                }
        }

        if (need || force) {
                list_for_each(pos, &list) {
                        snap = (void *)pos;
                        if (is_auto_snap(snap->key)) {
                                DINFO("auto remove %s need:%d force:%d\n", (char *)snap->key, need, force);
                                ret = __volume_proto_snapshot_cleanup_direct(volume_proto, snap->key);
                                if (unlikely(ret)) {
                                        DERROR("auto remove %s fail ret:%d\n", snap->key, ret);
                                        GOTO(err_ret, ret);
                                }
                        }
                }
        }

        __snapshot_list_free(&list);
        return 0;
err_ret:
        __snapshot_list_free(&list);
        return ret;
}

STATIC int __volume_proto_snapshot_remove_root(volume_proto_t *volume_proto,
                                          uint64_t snap_from)
{
        int ret;
        table1_t *table1;
        char snap[MAX_NAME_LEN];
        struct list_head list;

        table1 = &volume_proto->table1;
        INIT_LIST_HEAD(&list);

        ret = table1->snapshot_getbyversion(table1, snap_from, NULL, snap);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!is_root_snap(snap)) {
                goto out;
        }

        ret = table1->snapshot_listchild(table1, snap_from, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (list_empty(&list)) {
                DINFO("remove root %s\n", snap);
                ret = __volume_proto_snapshot_remove__(volume_proto, snap);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

out:
        __snapshot_list_free(&list);
        return 0;
err_ret:
        __snapshot_list_free(&list);
        return ret;
}

STATIC int __volume_proto_snapshot_rollback_bh2(volume_proto_t *volume_proto, fileid_t *snapid)
{
        int ret, chknum, i, deleting;
        table1_t *table1;
        table2_t *table2;
        chkid_t chkid;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        ANALYSIS_BEGIN(0);

        chkinfo = (void *)_chkinfo;
        table1 = &volume_proto->table1;
        table2 = &volume_proto->table2;
        chknum = size2chknum(table1->fileinfo.size, &table1->fileinfo.ec);
        for (i = 0; i < chknum; i++) {
                fid2cid(&chkid, &table1->fileinfo.id, i);

                deleting = table1->fileinfo.attr & __FILE_ATTR_DELETE__;
                if (unlikely(deleting)) {
                        ret = ECANCELED;
                        GOTO(err_ret, ret);
                }

                ret = table2->chunk_getinfo(table2, &chkid, chkinfo);
                if (unlikely(ret)) {
                        if (ret == ENOENT)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }

                if (table1->fileinfo.snap_rollback != chkinfo->snap_version) {
                        DWARN(CHKID_FORMAT" rollback %ju %ju\n",
                              CHKID_ARG(&chkinfo->id), table1->fileinfo.snap_rollback,
                              chkinfo->snap_version);
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        ret = table1->snapshot_getbyversion(table1, table1->fileinfo.snap_rollback, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *snapid = chkinfo->id;

        /* remove auto snap if this snap not be used anymore */
        ret = __volume_proto_snapshot_remove_auto(volume_proto,
                        table1->fileinfo.snap_rollback, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_rollback_bh(volume_proto_t *volume_proto, buffer_t *buf)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->snapshot_rollback_bh(table1, buf);
}

static int __volume_proto_snapshot_cleanup_chunk1(const nid_t *srcnid,
                                                  const fileid_t *srcid, const nid_t *distnid,
                                                  const fileid_t *distid, int idx, uint64_t max, const ec_t *ec)
{
        int ret;
        buffer_t buf;
        uint64_t offset, split_size, left;
        uint32_t size;
        io_t io;
        chkid_t chkid;

        ANALYSIS_BEGIN(0);

        fid2cid(&chkid, distid, idx);
#if ENABLE_EC
        if (EC_ISEC(ec)) {
                split_size = ec->k * LICH_CHUNK_SPLIT;
#if ECLOG_ENABLE
                if (!eclog_chunk_islog(&chkid, ec)) {
                        offset = (LLU)eclog_chunk_skiplog(&chkid, ec) * split_size;
                } else {
                        goto out;
                }
#else
                offset = (LLU)chkid.idx * split_size;
#endif
        } else {
                split_size = LICH_CHUNK_SPLIT;
                offset = (LLU)chkid.idx * split_size;
        }
#else
        (void) ec;
        split_size = LICH_CHUNK_SPLIT;
        offset = (LLU)chkid.idx * split_size;
#endif

        left = max - offset;
        size = left > split_size ? split_size : left;

        YASSERT(size > 0);

        io_init(&io, srcid, NULL, offset, size, 0);

        mbuffer_init(&buf, 0);
        ret = volume_proto_snapshot_read_remote(srcnid, &io, &buf, FALSE);
        if (unlikely(ret)) {
                if (ret == ENOENT || ret == ENOKEY) {
                        goto out;
                } else {
                        DWARN("read "CHKID_FORMAT" @ %s (%u) %s\n", CHKID_ARG(&chkid),
                              network_rname(srcnid), ret, strerror(ret));

                        ret = _errno(ret);
                        GOTO(err_ret, ret);
                }
        }

        DBUG("read chunk "CHKID_FORMAT" @ %s\n", CHKID_ARG(srcid),
             network_rname(srcnid));

        ret = __volume_proto_snapshot_newchunk(distnid, distid, idx, &buf);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                } else {
                        DWARN("write chunk "CHKID_FORMAT" @ %s (%u) %s\n", CHKID_ARG(&chkid),
                              network_rname(srcnid), ret, strerror(ret));
                        ret = _errno(ret);
                        GOTO(err_free, ret);
                }
        }

        mbuffer_free(&buf);
out:
        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_chunk_exist(const nid_t *snapnid,
                                            const fileid_t *snapid, int idx, int *exist)
{
        int ret;
        chkid_t chkid;

        ANALYSIS_BEGIN(0);

        fid2cid(&chkid, snapid, idx);
        if (net_islocal(snapnid)) {
                ret = volume_ctl_chunk_exist(snapid, &chkid, exist);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                ret = stor_rpc_chunk_exist(snapnid, snapid, &chkid, exist);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_snapshot_cleanup_chunk(const nid_t *srcnid,
                                                 const fileid_t *srcid, const nid_t *distnid,
                                                 const fileid_t *distid, int idx, uint64_t max, const ec_t *ec)
{
        int ret, exist = 0;

        if (distnid == NULL) {
                goto out;
        }

        ret = __volume_proto_snapshot_chunk_exist(distnid, distid, idx, &exist);
        if (unlikely(ret)) {
                DWARN("check "CHKID_FORMAT"[%u] @ %s (%u) %s\n", CHKID_ARG(distid), idx,
                      network_rname(distnid), ret, strerror(ret));
                ret = _errno(ret);
                GOTO(err_ret, ret);
        }

        if (exist) {
                goto out;
        }

        ret = __volume_proto_snapshot_cleanup_chunk1(srcnid, srcid, distnid,
                                                     distid, idx, max, ec);
        if (unlikely(ret))
                GOTO(err_ret, ret);

out:
        return 0;
err_ret:
        return ret;
}

int volume_proto_snapshot_cleanup_bh_src(volume_proto_t *volume_proto, const char *name,
                chkinfo_t *chkinfo, uint64_t *snap_version)
{
        int ret;
        table1_t *table1;

        ANALYSIS_BEGIN(0);
        table1 = &volume_proto->table1;
        ret = table1->snapshot_lookup(table1, name, chkinfo, snap_version);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

int volume_proto_snapshot_cleanup_bh_dist(volume_proto_t *volume_proto, const fileid_t *fileid,
                nid_t *distnid, fileid_t *distid)
{
        int ret;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        table1_t *table1;

        ANALYSIS_BEGIN(0);
        table1 = &volume_proto->table1;
        ret = table1->snapshot_prev(table1, fileid, distid, NULL, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        chkinfo = (void *)_chkinfo;
        ret = table1->snapshot_getinfo(table1, distid, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *distnid = chkinfo->diskid[0].id;

        ANALYSIS_END(0, IO_WARN, NULL);

        return 0;
err_ret:
        return ret;
}

#if 0
static int __volume_proto_snapshot_cleanup_bh2(volume_proto_t *volume_proto, const char *name)
{
        int ret;
        table1_t *table1;
        table_proto_t *table_proto;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = volume_proto_snapshot_cleanup_bh_src(volume_proto, name, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (net_islocal(&chkinfo->diskid[0].id)) {
                ret = volume_ctl_snapshot_cleanup(&chkinfo->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = stor_rpc_snapshot_cleanup(&chkinfo->diskid[0].id, &chkinfo->id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        table1 = &volume_proto->table1;
        table_proto = table1->table_proto;
        ret = table_proto->snap->remove(table_proto, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#else

static int __volume_proto_snapshot_cleanup_bh2__(volume_proto_t *volume_proto, const chkinfo_t*chkinfo)
{
        int ret;
        fileid_t fileid;
        char name[MAX_NAME_LEN];

        ret = rmvol_bh_root(volume_proto->table1.pool, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(name, MAX_NAME_LEN, CHKID_FORMAT, CHKID_ARG(&chkinfo->id));
        ret = md_mkvolwith(&fileid, name, chkinfo);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DWARN("%s exist\n", name);
                } else
                        GOTO(err_ret, ret);
        }

        DINFO("create %s @ "CHKID_FORMAT"\n", name, CHKID_ARG(&fileid));

        ret = chunk_proto_setparent(chkinfo, &fileid);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DWARN("%s not exist\n", name);
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_cleanup_updateparent(volume_proto_t *volume_proto,
                uint64_t snap_ver, uint64_t snap_from)
{
        int ret;
        table1_t *table1;
        struct list_head list;
        struct list_head *pos;
        snap_t *snap;

        INIT_LIST_HEAD(&list);

        table1 = &volume_proto->table1;
        ret = table1->snapshot_listchild(table1, snap_ver, &list);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each(pos, &list) {
                snap = (void *)pos;

                DINFO("vol %s snap %s %jd %jd => %jd\n", id2str(&volume_proto->chkid),
                      snap->key,
                      snap->snap_version,
                      snap->snap_from,
                      snap_from);
                ret = table1->snapshot_updateparent(table1, snap->key, snap_from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        }

        __snapshot_list_free(&list);

        if (table1->fileinfo.snap_from == snap_ver) {
                DINFO("vol %s %jd %jd => %jd\n", id2str(&volume_proto->chkid),
                      table1->fileinfo.snap_version,
                      table1->fileinfo.snap_from,
                      snap_from);
                ret = table1->snapshot_setfrom(table1, snap_from);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        __snapshot_list_free(&list);
        return ret;
}

STATIC int __volume_proto_snapshot_cleanup_bh2(volume_proto_t *volume_proto, const char *name)
{
        int ret;
        fileid_t prev_id;
        uint64_t prev_ver, snap_ver;
        table1_t *table1;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = volume_proto_snapshot_cleanup_bh_src(volume_proto, name, chkinfo, &snap_ver);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 把快照对应的lich卷标记删除，即注册到unlink目录下
        ret = __volume_proto_snapshot_cleanup_bh2__(volume_proto, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table1 = &volume_proto->table1;
        ret = table1->snapshot_prev(table1, &chkinfo->id, &prev_id, NULL, &prev_ver);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        prev_ver = -1;
                } else
                        GOTO(err_ret, ret);
        }

        // 更新所有子节点(包括快照，自动快照和卷）的父节点，指向要删除节点的父节点
        ret = __volume_proto_snapshot_cleanup_updateparent(volume_proto, snap_ver, prev_ver);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table1->snapshot_remove(table1, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 只有叶子快照才保留auto snap
        // 如果snap_from的子节点包含有卷或快照，则删除auto snap
        ret = __volume_proto_snapshot_remove_auto(volume_proto, prev_ver, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // root快照是隐藏快照，在创建第一个用户快照之前创建，作为快照树的根节点
        // 若root快照没有子节点，则删除之
        ret = __volume_proto_snapshot_remove_root(volume_proto, prev_ver);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_snapshot_cleanup_direct(volume_proto_t *volume_proto, const char *name)
{
        int ret;
        table1_t *table1;
        char _chkinfo[CHKINFO_MAX];
        chkinfo_t *chkinfo = (void *)_chkinfo;

        ret = volume_proto_snapshot_cleanup_bh_src(volume_proto, name, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __volume_proto_snapshot_cleanup_bh2__(volume_proto, chkinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table1 = &volume_proto->table1;
        ret = table1->snapshot_remove(table1, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#endif

STATIC int __volume_proto_snapshot_isempty(volume_proto_t *volume_proto, int *empty)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;
        ret = table1->snapshot_isempty(table1, empty);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief 从快照上读取数据
 *
 * @param volume_proto
 * @param chkid
 * @param buf
 * @param _size
 * @param _offset
 * @param fillzero if chunk not allocate, is fill zero to buff.
 *
 * @return 如无数据，返回全0的buf, 且ret == 0
 */
int volume_proto_snapshot_chunk_redirect(volume_proto_t *volume_proto, const chkid_t *chkid,
                                       buffer_t *buf, int _size, int _offset, BOOL fillzero)
{
        int ret, valuelen, count;
        table1_t *table1;
        char value[MAX_NAME_LEN];
        char *list[2];
        fileid_t parent, fileid;
        off_t offset;
        size_t size;

        // TODO 不用每次都计算，加载clone卷时生成parent and fileid
        table1 = &volume_proto->table1;
        valuelen = MAX_NAME_LEN;
        ret = table1->xattr_get(table1, LICH_SYSTEM_ATTR_SOURCE, value, &valuelen);
        if (unlikely(ret)) {
                if (ret == ENOKEY) {
                        DERROR("volume "CHKID_FORMAT" source not found\n",
                               CHKID_ARG(&volume_proto->chkid));
                }
                GOTO(err_ret, ret);
        }

        count = 2;
        _str_split(value, '/', list, &count);
        str2chkid(&parent, list[0]);
        str2chkid(&fileid, list[1]);

        offset = (off_t)_offset + (off_t)chkid->idx * LICH_CHUNK_SPLIT;
        size = (size_t)_size;

        // pool什么用？
        ret = volume_snapshot_read(volume_proto->table1.pool, &parent, &fileid, buf, size, offset, fillzero);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

/**
 * @brief: 如果IO数据为全0,且该chunk未分配,
 *         那么不需要写入磁盘.
 *
 * @param buf
 * @param needwrite
 * @param zero
 * @return
 */
STATIC int check_buffer_data_is_zero(buffer_t *buf, int *needwrite, void *zero)
{
        int ret, i, iov_count = LICH_IOV_MAX;
        struct iovec iov[LICH_IOV_MAX];

        if (zero == NULL) {
                *needwrite = 1;
                return 0;
        }

        ret = mbuffer_trans(iov, &iov_count, buf);
        YASSERT(ret == buf->len);

        *needwrite = 0;
        for (i = 0; i < iov_count; i++) {
                if(memcmp(iov[i].iov_base, zero, iov[i].iov_len)) {
                        *needwrite = 1;
                        break;
                }
        }

        return 0;
}

int volume_proto_snapshot_chunk_clone(volume_proto_t *volume_proto, const chkid_t *chkid, int thin)
{
        int ret, localize;
        buffer_t buf;
        table2_t *table2;
        io_opt_t io_opt;
        int needwrite = 1;

        DBUG("clone "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        ANALYSIS_BEGIN(0);

        mbuffer_init(&buf, 0);
        ret = volume_proto_snapshot_chunk_redirect(volume_proto, chkid, &buf, LICH_CHUNK_SPLIT, 0, FALSE);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        needwrite = 0;
                } else
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, 1000 * 1000, "clone:snapshot_read");

        volume_proto->flat_ctx.stat.snap_read++;

        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        table2 = &volume_proto->table2;
        io_opt_init(&io_opt, localize, 0, 0, 0);

        // check_buffer_data_is_zero(&buf, &needwrite, zero);
        if (needwrite) {
                ANALYSIS_BEGIN(1);

                ret = table2->chunk_createwith(table2, chkid, &io_opt, volume_proto, &buf);
                if (unlikely(ret)) {
                        if (ret != EEXIST)
                                GOTO(err_free, ret);
                }

                ANALYSIS_QUEUE(1, 1000 * 1000, "clone:chunk_createwith");

                volume_proto->flat_ctx.stat.vol_createwith++;
        } else if (!thin) {
                ANALYSIS_BEGIN(2);

                ret = table2->chunk_create(table2, chkid, 1, &io_opt, volume_proto);
                if (unlikely(ret)) {
                        if (ret != EEXIST)
                                GOTO(err_free, ret);
                }

                ANALYSIS_QUEUE(2, 1000 * 1000, "clone:chunk_createwith");

                volume_proto->flat_ctx.stat.vol_create++;
        }

        volume_proto->flat_ctx.stat.vol_write++;

        mbuffer_free(&buf);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

int volume_proto_snapshot_init(volume_proto_t *volume_proto)
{
        volume_proto->snapshot_create = __volume_proto_snapshot_create;
        volume_proto->snapshot_check = __volume_proto_snapshot_check;
        volume_proto->snapshot_protect = __volume_proto_snapshot_protect;
        volume_proto->snapshot_read = volume_proto_snapshot_read;

        // snaptree
        volume_proto->snapshot_listopen = __volume_proto_snapshot_listopen;
        volume_proto->snapshot_list = __volume_proto_snapshot_list;
        volume_proto->snapshot_listclose = __volume_proto_snapshot_listclose;
        volume_proto->snapshot_lookup = __volume_proto_snapshot_lookup;
        volume_proto->snapshot_last = __volume_proto_snapshot_last;
        volume_proto->snapshot_prev = __volume_proto_snapshot_prev;
        volume_proto->snapshot_isempty = __volume_proto_snapshot_isempty;

        volume_proto->snapshot_setfrom = __volume_proto_snapshot_setfrom;
        volume_proto->snapshot_updateparent = __volume_proto_snapshot_updateparent;

        // volume_proto->snapshot_cleanup = __volume_proto_snapshot_cleanup;

        volume_proto->snapshot_remove = __volume_proto_snapshot_remove;

        volume_proto->snapshot_cleanup_direct = __volume_proto_snapshot_cleanup_direct;

        volume_proto->snapshot_cleanup_chunk = __volume_proto_snapshot_cleanup_chunk;
        volume_proto->snapshot_cleanup_bh2 = __volume_proto_snapshot_cleanup_bh2;

        volume_proto->snapshot_rollback = __volume_proto_snapshot_rollback;

        volume_proto->snapshot_rollback_bh1 = __volume_proto_snapshot_rollback_bh1;
        volume_proto->snapshot_rollback_bh2 = __volume_proto_snapshot_rollback_bh2;
        volume_proto->snapshot_rollback_bh = __volume_proto_snapshot_rollback_bh;

        volume_proto->snapshot_flat = volume_proto_snapshot_flat;
        volume_proto->snapshot_flat_bh1 = volume_proto_snapshot_flat_bh1;

        uuid_clear(volume_proto->snapshot_lock_uuid);

        return 0;
}
